test: create a test for m0002010 using DS4#2243
Conversation
Reviewer's GuideRefactors migration dump handling into a reusable URL-based Dumps abstraction, extends decompression and embedded DB import to support gzip as well as xz, introduces Btrfs-backed snapshot-aware migration test contexts, and adds a long-running DS4-based performance test for migration m0002010 while updating CI workflows and dependencies accordingly. Class diagram for updated migration data handlingclassDiagram
class Runner {
+Database database
+DispatchBackend storage
+Options options
}
class Database {
<<enum>>
Config
Provided
+try_into_connection() DbErr DatabaseConnection
}
class ConnectOptions {
+new(url: String) ConnectOptions
+set_schema_search_path(schema: String) ConnectOptions
}
class DatabaseConnection
class SchemaManager {
+get_connection() DatabaseConnection
+process~D~(db: ConnectionTrait, storage: DispatchBackend, options: Options, f: Handler~D~) DbErr
}
class SchemaDataManager {
+SchemaManager manager
+DatabaseConnection db
+DispatchBackend storage
+Options options
+new(manager: SchemaManager, db: DatabaseConnection, storage: DispatchBackend, options: Options)
+process~D~(storage: DispatchBackend, options: Options, f: Handler~D~) DbErr
}
class Options {
+usize concurrent
+u64 current
+u64 total
+bool skip_all
+bool skip_data
+new() Options
}
class DocumentProcessor {
<<interface>>
+process~D~(db: ConnectionTrait, storage: DispatchBackend, options: Options, f: Handler~D~) DbErr
}
class DispatchBackend
class Handler {
<<interface>>
}
class ConnectionTrait {
<<interface>>
}
class TransactionTrait {
<<interface>>
}
Runner --> Database
Runner --> Options
Runner --> DispatchBackend
Database --> DatabaseConnection
Database --> ConnectOptions
SchemaDataManager --> SchemaManager
SchemaDataManager --> DatabaseConnection
SchemaDataManager --> DispatchBackend
SchemaDataManager --> Options
DocumentProcessor <|.. SchemaManager
SchemaManager --> DispatchBackend
SchemaManager --> Options
SchemaManager --> ConnectionTrait
ConnectionTrait <|.. DatabaseConnection
TransactionTrait <|.. DatabaseConnection
Options ..> num_cpus
Flow diagram for dump-based migration test and decompressionflowchart LR
A[Test runner] --> B[Migration test context]
B --> C[Dumps helper]
C --> D{Dump source}
D -->|Migration URL| E[Download dump from S3]
D -->|Local path| F[Use existing dump file]
E --> G[Local dump file]
F --> G
G --> H[decompress_async_read]
G --> I[decompress_read]
H --> J[Embedded database import]
I --> J
J --> K[Embedded PostgreSQL instance]
K --> L[Runner]
L --> M[Database.try_into_connection]
M --> N[SchemaManager]
N --> O[SchemaDataManager]
O --> P[DocumentProcessor.process]
P --> Q[Parallel document handling using buffer_unordered]
Q --> R[Migration m0002010 completed on DS4 dump]
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
ce8f274 to
a1499eb
Compare
d674a6d to
7920244
Compare
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- In
Dumps::provide_raw/download_artifacts_rawa freshreqwest::Clientis created for each invocation; consider keeping a shared client onDumpsto take advantage of connection pooling and reduce overhead for repeated test runs. - The new
log::info!calls indownload_artifacts_raw(progress output) and inFileSystemBackend::findfor missing files may be quite noisy in normal test runs; consider downgrading these todebug(or gating them) so info-level logs remain focused on higher‑level events.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `Dumps::provide_raw`/`download_artifacts_raw` a fresh `reqwest::Client` is created for each invocation; consider keeping a shared client on `Dumps` to take advantage of connection pooling and reduce overhead for repeated test runs.
- The new `log::info!` calls in `download_artifacts_raw` (progress output) and in `FileSystemBackend::find` for missing files may be quite noisy in normal test runs; consider downgrading these to `debug` (or gating them) so info-level logs remain focused on higher‑level events.
## Individual Comments
### Comment 1
<location path="test-context/src/migration.rs" line_range="329-274" />
<code_context>
+/// Manage raw dump downloads
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding focused tests for `Dumps::provide`/`provide_raw` covering cache reuse, locking behavior, and digest/no-digest modes.
These methods are now a key path for downloading and caching dump artifacts, but their behavior isn’t directly tested. Adding a few focused unit/integration tests would help:
* Cache reuse: Call `provide` twice for the same `Dump` (using a local test server or file-based mock) and assert that the second call doesn’t trigger another download (e.g., via a request counter).
* Digest vs no-digest: In `digests = true` mode, verify success with matching `.sha256` files and failure with mismatched ones; in `digests = false` mode, ensure missing `.sha256` files don’t error.
* Locking: Call `provide` concurrently from two tasks for the same URL and assert both succeed with uncorrupted content, confirming that locking prevents duplicate or conflicting downloads.
Given that many migration and external dump tests rely on this code, these tests would materially improve reliability and debuggability.
Suggested implementation:
```rust
/// Manage raw dump downloads
#[derive(Debug)]
pub struct Dumps {
base: PathBuf,
}
```
` section.
Here are the code changes:
```xml
<file_operations>
<file_operation operation="edit" file_path="test-context/src/migration.rs">
<<<<<<< SEARCH
/// Manage raw dump downloads
#[derive(Debug)]
pub struct Dumps {
base: PathBuf,
}
=======
/// Manage raw dump downloads
#[derive(Debug)]
pub struct Dumps {
base: PathBuf,
}
>>>>>>> REPLACE
</file_operation>
</file_operations>
<additional_changes>
Please append the following `#[cfg(test)]` module to the end of `test-context/src/migration.rs` (after the `impl Dumps` block and any associated functions like `provide` / `provide_raw`). The tests assume that:
- `Dumps::new()` uses the `TRUSTIFY_MIGRATION_DUMPS` environment variable as the base cache directory.
- `Dumps::provide(&self, dump: &Dump<'_, String>) -> anyhow::Result<PathBuf>` (or similar) exists and downloads/caches the dump files defined in `Dump::files`.
- `Dumps::provide_raw(...)` is called internally by `provide` and handles locking/digest verification as implied by your comment.
If your actual signatures differ slightly, adjust the parameter/return types accordingly but keep the test logic intact.
```rust
#[cfg(test)]
mod tests {
use super::{Dump, Dumps};
use std::env;
use std::fs;
use std::io::Write;
use std::net::{TcpListener, TcpStream};
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::thread;
use std::time::Duration;
use sha2::{Digest, Sha256};
use tempfile::TempDir;
/// Simple HTTP response writer for our tiny test server.
fn write_http_response(mut stream: TcpStream, status: &str, body: &[u8]) {
let headers = format!(
"HTTP/1.1 {status}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
body.len()
);
let _ = stream.write_all(headers.as_bytes());
let _ = stream.write_all(body);
}
/// A tiny HTTP test server that serves three endpoints:
///
/// - /dump.bin => dump content (counts requests)
/// - /dump.bin.sha256 => hex-encoded sha256 of dump content
/// - /bad.bin.sha256 => incorrect sha256 (for negative test)
struct TestServer {
addr: String,
request_count: Arc<AtomicUsize>,
_handle: thread::JoinHandle<()>,
}
impl TestServer {
fn start() -> Self {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
let addr = listener.local_addr().unwrap().to_string();
let request_count = Arc::new(AtomicUsize::new(0));
let request_count_clone = Arc::clone(&request_count);
let handle = thread::spawn(move || {
// Fixed content for dump.bin
let content = b"test-dump-content";
let mut hasher = Sha256::new();
hasher.update(content);
let good_digest = format!("{:x}", hasher.finalize());
let bad_digest = "0000000000000000000000000000000000000000000000000000000000000000";
for stream in listener.incoming() {
if let Ok(stream) = stream {
let mut buf = [0u8; 1024];
let read_len = match stream.peek(&mut buf) {
Ok(n) => n,
Err(_) => continue,
};
let req = String::from_utf8_lossy(&buf[..read_len]);
let path = if let Some(line) = req.lines().next() {
line.split_whitespace().nth(1).unwrap_or("/")
} else {
"/"
};
request_count_clone.fetch_add(1, Ordering::SeqCst);
match path {
"/dump.bin" => {
write_http_response(stream, "200 OK", content);
}
"/dump.bin.sha256" => {
write_http_response(stream, "200 OK", good_digest.as_bytes());
}
"/bad.bin.sha256" => {
write_http_response(stream, "200 OK", bad_digest.as_bytes());
}
_ => {
write_http_response(stream, "404 Not Found", b"");
}
}
}
}
});
TestServer {
addr,
request_count,
_handle: handle,
}
}
fn base_url(&self) -> String {
format!("http://{}", self.addr)
}
fn request_count(&self) -> usize {
self.request_count.load(Ordering::SeqCst)
}
}
/// Helper to configure `Dumps` with a temporary base directory.
fn make_dumps_with_tempdir() -> (Dumps, TempDir) {
let tmp = TempDir::new().expect("tempdir");
env::set_var("TRUSTIFY_MIGRATION_DUMPS", tmp.path());
let dumps = Dumps::new().expect("Dumps::new");
(dumps, tmp)
}
fn assert_file_contents_eq(path: &Path, expected: &[u8]) {
let data = fs::read(path).expect("read cached file");
assert_eq!(data, expected);
}
#[test]
fn cache_is_reused_on_subsequent_provide_calls() {
let server = TestServer::start();
let (dumps, _tmp) = make_dumps_with_tempdir();
let url_base = server.base_url();
let dump = Dump {
url: &url_base,
files: &["dump.bin".to_string()],
digests: true,
};
// First call should trigger a download.
let path1 = dumps.provide(&dump).expect("first provide");
assert!(path1.is_dir(), "expected provide to return a directory");
// Second call should reuse the cached content.
let path2 = dumps.provide(&dump).expect("second provide");
assert_eq!(path1, path2, "expected same cache directory to be reused");
// We know the server is used, but the key assertion is that
// a second call does not cause an extra download of dump.bin.
// This relies on `Dumps::provide` checking the cache/lock correctly.
let count = server.request_count();
assert!(
count <= 3,
"expected limited HTTP requests due to cache reuse, got {count}"
);
}
#[test]
fn digest_verification_succeeds_and_fails_as_expected() {
let server = TestServer::start();
let (dumps, _tmp) = make_dumps_with_tempdir();
let url_base = server.base_url();
// digests = true, matching sha256
let dump_ok = Dump {
url: &url_base,
files: &["dump.bin".to_string()],
digests: true,
};
let dir_ok = dumps.provide(&dump_ok).expect("provide with matching digest");
let file_ok = dir_ok.join("dump.bin");
assert!(file_ok.exists(), "file should exist after successful provide");
assert_file_contents_eq(&file_ok, b"test-dump-content");
// digests = true, but we point to a mismatched .sha256 file.
// This assumes `provide` / `provide_raw` uses `{file}.sha256` from the same base URL.
let dump_bad = Dump {
url: &url_base,
files: &["bad.bin".to_string()],
digests: true,
};
let result_bad = dumps.provide(&dump_bad);
assert!(
result_bad.is_err(),
"expected provide to fail when digest does not match"
);
// digests = false: missing .sha256 must not cause an error.
let dump_no_digest = Dump {
url: &url_base,
files: &["dump.bin".to_string()],
digests: false,
};
let dir_no_digest = dumps
.provide(&dump_no_digest)
.expect("provide without digest even if .sha256 is missing/unused");
let file_no_digest = dir_no_digest.join("dump.bin");
assert!(file_no_digest.exists());
assert_file_contents_eq(&file_no_digest, b"test-dump-content");
}
#[test]
fn concurrent_provide_calls_are_locked_and_content_is_not_corrupted() {
use std::sync::Barrier;
let server = TestServer::start();
let (dumps, _tmp) = make_dumps_with_tempdir();
let url_base = server.base_url();
let dump = Dump {
url: &url_base,
files: &["dump.bin".to_string()],
digests: true,
};
// Run two threads that call `provide` concurrently for the same dump.
let barrier = Arc::new(Barrier::new(2));
let dumps1 = dumps.clone();
let dumps2 = dumps;
let dump1 = Dump {
url: dump.url,
files: dump.files,
digests: dump.digests,
};
let dump2 = Dump {
url: dump.url,
files: dump.files,
digests: dump.digests,
};
let b1 = barrier.clone();
let t1 = thread::spawn(move || {
b1.wait();
dumps1.provide(&dump1)
});
let b2 = barrier.clone();
let t2 = thread::spawn(move || {
b2.wait();
dumps2.provide(&dump2)
});
let res1 = t1.join().expect("thread 1 panicked");
let res2 = t2.join().expect("thread 2 panicked");
let dir1 = res1.expect("thread 1 provide failed");
let dir2 = res2.expect("thread 2 provide failed");
assert_eq!(
dir1, dir2,
"both concurrent calls should resolve to the same cache directory"
);
let file = dir1.join("dump.bin");
assert!(file.exists(), "cached file must exist");
assert_file_contents_eq(&file, b"test-dump-content");
}
}
```
Notes / adjustments you may need to make:
1. **Imports**
- If `sha2` or `tempfile` are not yet available in this crate, add them as test/dev dependencies in `test-context/Cargo.toml`:
```toml
[dev-dependencies]
sha2 = "0.10"
tempfile = "3"
```
- If your project already uses another hashing/tempdir crate, swap those in instead.
2. **`Dumps` API**
- If `Dumps` is not `Clone`, either:
- Implement `Clone` for `Dumps` (e.g., it only wraps a `PathBuf`), or
- Change the concurrent test to wrap `Dumps` in `Arc<Dumps>` and clone the `Arc` instead.
3. **`Dump` type**
- The tests use `Dump<'_, String>` with `files: &["dump.bin".to_string()]`.
If your `Dump` is usually constructed differently (e.g., `&[&str]`), update the tests to match.
4. **URL-to-path mapping**
- The tests assume `Dumps::provide` constructs URLs as `format!("{url}/{file}")`.
If your code uses a different convention, adjust `TestServer` paths and/or test URLs accordingly.
These tests will directly cover cache reuse, digest behavior, and locking semantics around `Dumps::provide` / `provide_raw`, and should be easy to adapt to your exact signatures and internal conventions.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2243 +/- ##
==========================================
- Coverage 68.10% 67.79% -0.32%
==========================================
Files 425 428 +3
Lines 24886 25433 +547
Branches 24886 25433 +547
==========================================
+ Hits 16949 17242 +293
- Misses 7018 7237 +219
- Partials 919 954 +35 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
e7e26d6 to
a493616
Compare
| - run: df -h | ||
|
|
||
| - uses: actions/checkout@v4 | ||
| - uses: actions/checkout@v5 |
| - name: Test | ||
| # use only one job, trying to reduce memory usage | ||
| run: cargo llvm-cov --codecov --jobs 1 --features _test-s3 --output-path codecov.json | ||
| run: cargo llvm-cov --codecov --jobs 1 --all-features --output-path codecov.json |
| | `EXTERNAL_TEST_DB` | Run tests against external test database if set | | | ||
| | `EXTERNAL_TEST_DB_BOOTSTRAP` | Run tests against external test database if set | | | ||
| | `MEM_LIMIT_MB` | Set memory limit for tests that use TrustifyContext, shows the memory usage when the test reaches the limit | `500 MiB` | | ||
| | `TRUST_TEST_BTRFS_STORE` | Path to a BTRFS-backed directory for using snapshots for tests | User's home | |
2cfecee to
59fa48a
Compare
The logic treated .sql special, as being uncompressed. However, there are more extensions (.tar) which indicate an uncompressed file. We can't distinguish that properly. So whatever we understand, we process. But everything else, we take as plain data, and let the next step fail.
This speeds things up, starting a new database. Assisted-by: Claude Code
In case of in-band migrations, the sea orm migration manager will execute all migrations inside a single transaction. This leads to the problem, that concurrent operations are not possible. However, there can be the strategy to run data migrations upfront, using concurrency. And then re-run the migrations, but skipping the data migration.
c89ae1a to
f47d81c
Compare
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 5 issues, and left some high level feedback:
- The
Dumps::with_lockhelper is declared with animpl AsyncFnOnceparameter, which is not a stable or in-scope trait; consider changing the signature to take a closureF: FnOnce() -> FutwhereFut: Future<Output = anyhow::Result<R>>instead. - In
download_artifacts_rawyou usebase.join(file).exists()from the blocking std filesystem API inside async code; for large or remote filesystems this can stall the executor, so it would be better to switch totokio::fs::try_existsor move that check into aspawn_blockingsection. - The snapshot setup path (e.g.
Snapshot::setupandfix_zstdusingWalkDirandOpenOptions) performs potentially heavy, blocking filesystem work directly in async functions; consider moving these loops intospawn_blockingto avoid blocking the async runtime when operating on large dumps.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `Dumps::with_lock` helper is declared with an `impl AsyncFnOnce` parameter, which is not a stable or in-scope trait; consider changing the signature to take a closure `F: FnOnce() -> Fut` where `Fut: Future<Output = anyhow::Result<R>>` instead.
- In `download_artifacts_raw` you use `base.join(file).exists()` from the blocking std filesystem API inside async code; for large or remote filesystems this can stall the executor, so it would be better to switch to `tokio::fs::try_exists` or move that check into a `spawn_blocking` section.
- The snapshot setup path (e.g. `Snapshot::setup` and `fix_zstd` using `WalkDir` and `OpenOptions`) performs potentially heavy, blocking filesystem work directly in async functions; consider moving these loops into `spawn_blocking` to avoid blocking the async runtime when operating on large dumps.
## Individual Comments
### Comment 1
<location path="common/db/src/embedded.rs" line_range="82-84" />
<code_context>
-
- let source = tokio::fs::File::open(&path).await?;
- let source = BufReader::new(source);
+ log::info!("Importing database from: {}", path.display());
- let source: Pin<Box<dyn AsyncRead + Send>> = match path
- .extension()
- .and_then(|ext| ext.to_str())
- {
- None | Some("sql") => Box::pin(source),
- Some("xz") => Box::pin(async_compression::tokio::bufread::LzmaDecoder::new(source)),
- Some(ext) => anyhow::bail!("Unsupported file type ({ext})"),
- };
+ let source = decompress_async_read(path).await?;
super::Database::import(&config, source)
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Change from failing on unsupported extensions to transparently passing them through may hide format errors
The previous logic failed fast on unsupported extensions, giving clear feedback for cases like `.sql.xz`/`.sql` mismatches or unexpected compression types. `decompress_async_read` now treats unknown extensions as uncompressed, which can defer errors until SQL parsing and make failures harder to diagnose. To keep the earlier, clearer failure mode, consider enforcing an allowlist of extensions here or adding a strict-mode option to `decompress_async_read` that does so.
Suggested implementation:
```rust
Source::Import(path) => {
log::info!("Importing database from: {}", path.display());
// Enforce an allowlist of supported file extensions to fail fast on unexpected formats.
match path.extension().and_then(|ext| ext.to_str()) {
None | Some("sql") | Some("xz") => {}
Some(ext) => anyhow::bail!("Unsupported file type ({ext})"),
}
let source = decompress_async_read(path).await?;
super::Database::import(&config, source)
.await
```
If `anyhow` is not already in scope in this file, add an appropriate import (e.g. `use anyhow::bail;` and then use `bail!` instead of `anyhow::bail!`, or ensure `anyhow` is available as a crate path). No changes to `decompress_async_read` are strictly required with this approach, since the allowlist is enforced at the call site.
</issue_to_address>
### Comment 2
<location path="common/src/decompress.rs" line_range="121-130" />
<code_context>
+}
+
+/// Take a file, return a wrapped [`Read`], and wrap that with the required compression decoder.
+pub fn decompress_read(path: impl AsRef<Path>) -> anyhow::Result<Box<dyn Read + Send>> {
+ let path = path.as_ref();
+ let source = std::fs::File::open(path)?;
+ let source = std::io::BufReader::new(source);
+
+ Ok(match path.extension().and_then(|ext| ext.to_str()) {
+ Some("xz") => Box::new(lzma_rust2::XzReader::new(source, false)),
+ Some("gz") => Box::new(flate2::read::GzDecoder::new(source)),
+ // Anything else could be .sql, .tar, or an unsupported compression format.
+ // In that case, the following code would fail to understand the compressed content.
+ None | Some(_) => Box::new(source),
+ })
+}
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Unknown file extensions are treated as plain data, which may mask compression mismatches
In the sync path, any non-`xz`/`gz` extension (e.g. `.bz2`) hits the `None | Some(_)` arm and is treated as uncompressed, so a misconfigured or unsupported extension will only fail later when consumers parse the data. Consider either restricting the `Some(_)` arm to known extensions and erroring on everything else, or at least logging a warning when an unrecognized extension is present.
Suggested implementation:
```rust
+/// Take a file, return a wrapped [`Read`], and wrap that with the required compression decoder.
+pub fn decompress_read(path: impl AsRef<Path>) -> anyhow::Result<Box<dyn Read + Send>> {
+ let path = path.as_ref();
+ let source = std::fs::File::open(path)?;
+ let source = std::io::BufReader::new(source);
+
+ Ok(match path.extension().and_then(|ext| ext.to_str()) {
+ Some("xz") => Box::new(lzma_rust2::XzReader::new(source, false)),
+ Some("gz") => Box::new(flate2::read::GzDecoder::new(source)),
+ // No extension: treat as uncompressed input (e.g. `.sql`, `.tar`).
+ None => Box::new(source),
+ // Unknown/unsupported extension (e.g. `.bz2`): warn and treat as uncompressed.
+ Some(ext) => {
+ warn!(
+ "Unknown compression extension `{}` for path {:?}; treating as uncompressed input",
+ ext, path
+ );
+ Box::new(source)
+ }
+ })
+}
```
```rust
Ok(match path.extension().and_then(|ext| ext.to_str()) {
Some("xz") => Box::pin(async_compression::tokio::bufread::LzmaDecoder::new(source)),
Some("gz") => Box::pin(async_compression::tokio::bufread::GzipDecoder::new(source)),
// No extension: treat as uncompressed input (e.g. `.sql`, `.tar`).
None => Box::pin(source),
// Unknown/unsupported extension (e.g. `.bz2`): warn and treat as uncompressed.
Some(ext) => {
warn!(
"Unknown compression extension `{}` for path {:?}; treating as uncompressed input",
ext, path
);
Box::pin(source)
}
})
}
```
1. Ensure a logging macro `warn!` is in scope for this module. For example, if the project uses `tracing`, add:
- `use tracing::warn;`
If it uses the `log` crate instead, add:
- `use log::warn;`
2. If there is a central logging style (structured fields, custom spans, etc.), adjust the `warn!` calls to match that convention.
</issue_to_address>
### Comment 3
<location path="migration/src/data/mod.rs" line_range="43-47" />
<code_context>
#[derive(Clone, Debug, PartialEq, Eq, clap::Parser)]
pub struct Options {
/// Number of concurrent documents being processes
- #[arg(long, env = "MIGRATION_DATA_CONCURRENT", default_value = "5")]
- pub concurrent: NonZeroUsize,
+ ///
+ /// If the value is zero, use the number of logical CPUs
+ #[arg(long, env = "MIGRATION_DATA_CONCURRENT", default_value = "0")]
+ pub concurrent: usize,
/// The instance number of the current runner (zero based)
</code_context>
<issue_to_address>
**question (bug_risk):** Default concurrency differs between programmatic `Default` and CLI/env default, which may confuse behavior
`Options::default()` still sets `concurrent` to 5, while the clap default is now `0` (interpreted as `num_cpus::get()` at runtime). So programmatic defaults and CLI/env defaults diverge. If this isn’t intentional, please align them (e.g., have `Default` use the `0` → `num_cpus` behavior, or change the clap default to `5`).
</issue_to_address>
### Comment 4
<location path="modules/storage/src/service/fs.rs" line_range="121-124" />
<code_context>
for compression in &self.read_compressions {
target.set_extension(compression.extension());
if try_exists(&target).await? {
+ log::debug!("Located: {}", target.display());
return Ok(Some((target, *compression)));
}
}
+ log::info!("Missing - base: {target:?}");
Ok(None)
}
</code_context>
<issue_to_address>
**suggestion (performance):** Using `info!` for missing file paths may be noisy in normal operation
Since `find_file` will be called often and missing compressed variants is expected, logging an `info!` on every miss could overwhelm production logs. Consider downgrading this to `debug!` or removing it unless you truly need it at info level.
```suggestion
}
+ log::debug!("Missing - base: {target:?}");
Ok(None)
}
```
</issue_to_address>
### Comment 5
<location path="migration/src/data/run.rs" line_range="41-50" />
<code_context>
+ }
+}
+
+impl Database {
+ pub async fn try_into_connection(self) -> Result<DatabaseConnection, DbErr> {
+ Ok(match self {
+ Self::Config { url, schema } => {
+ let schema = schema.clone().unwrap_or_else(|| "public".to_owned());
+
+ let connect_options = ConnectOptions::new(url)
+ .set_schema_search_path(schema)
+ .to_owned();
+
+ sea_orm::Database::connect(connect_options).await?
+ }
+ Self::Provided(database) => database.clone(),
+ })
+ }
</code_context>
<issue_to_address>
**suggestion (performance):** Unnecessary cloning of `schema` in `Config` arm and potential simplification of conversion
Inside `Config { url, schema }`, `schema` is already owned, so you can drop the clone and use `let schema = schema.unwrap_or_else(|| "public".to_owned());`. More broadly, `self.database.clone().try_into_connection()` only clones because `try_into_connection` takes `self` by value; if ownership transfer isn’t required, making it `&self` would remove the need to clone the enum (which may matter if `DatabaseConnection` is expensive to clone).
Suggested implementation:
```rust
impl Database {
pub async fn try_into_connection(&self) -> Result<DatabaseConnection, DbErr> {
Ok(match self {
Self::Config { url, schema } => {
let schema = schema
.as_ref()
.cloned()
.unwrap_or_else(|| "public".to_owned());
let connect_options = ConnectOptions::new(url.clone())
.set_schema_search_path(schema)
.to_owned();
sea_orm::Database::connect(connect_options).await?
}
Self::Provided(database) => database.clone(),
})
}
}
```
Anywhere `try_into_connection` is called, remove `.clone()` on the `Database` value and call it by reference instead. For example, change `self.database.clone().try_into_connection().await?` to `self.database.try_into_connection().await?`. Also ensure that any other direct uses of `schema` and `url` within `Config` arm remain valid now that the method takes `&self` (hence the `url.clone()` above to satisfy ownership for `ConnectOptions::new`).
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| log::info!("Importing database from: {}", path.display()); | ||
|
|
||
| let source: Pin<Box<dyn AsyncRead + Send>> = match path | ||
| .extension() | ||
| .and_then(|ext| ext.to_str()) | ||
| { | ||
| None | Some("sql") => Box::pin(source), | ||
| Some("xz") => Box::pin(async_compression::tokio::bufread::LzmaDecoder::new(source)), | ||
| Some(ext) => anyhow::bail!("Unsupported file type ({ext})"), | ||
| }; | ||
| let source = decompress_async_read(path).await?; |
There was a problem hiding this comment.
suggestion (bug_risk): Change from failing on unsupported extensions to transparently passing them through may hide format errors
The previous logic failed fast on unsupported extensions, giving clear feedback for cases like .sql.xz/.sql mismatches or unexpected compression types. decompress_async_read now treats unknown extensions as uncompressed, which can defer errors until SQL parsing and make failures harder to diagnose. To keep the earlier, clearer failure mode, consider enforcing an allowlist of extensions here or adding a strict-mode option to decompress_async_read that does so.
Suggested implementation:
Source::Import(path) => {
log::info!("Importing database from: {}", path.display());
// Enforce an allowlist of supported file extensions to fail fast on unexpected formats.
match path.extension().and_then(|ext| ext.to_str()) {
None | Some("sql") | Some("xz") => {}
Some(ext) => anyhow::bail!("Unsupported file type ({ext})"),
}
let source = decompress_async_read(path).await?;
super::Database::import(&config, source)
.awaitIf anyhow is not already in scope in this file, add an appropriate import (e.g. use anyhow::bail; and then use bail! instead of anyhow::bail!, or ensure anyhow is available as a crate path). No changes to decompress_async_read are strictly required with this approach, since the allowlist is enforced at the call site.
| pub fn decompress_read(path: impl AsRef<Path>) -> anyhow::Result<Box<dyn Read + Send>> { | ||
| let path = path.as_ref(); | ||
| let source = std::fs::File::open(path)?; | ||
| let source = std::io::BufReader::new(source); | ||
|
|
||
| Ok(match path.extension().and_then(|ext| ext.to_str()) { | ||
| Some("xz") => Box::new(lzma_rust2::XzReader::new(source, false)), | ||
| Some("gz") => Box::new(flate2::read::GzDecoder::new(source)), | ||
| // Anything else could be .sql, .tar, or an unsupported compression format. | ||
| // In that case, the following code would fail to understand the compressed content. |
There was a problem hiding this comment.
suggestion (bug_risk): Unknown file extensions are treated as plain data, which may mask compression mismatches
In the sync path, any non-xz/gz extension (e.g. .bz2) hits the None | Some(_) arm and is treated as uncompressed, so a misconfigured or unsupported extension will only fail later when consumers parse the data. Consider either restricting the Some(_) arm to known extensions and erroring on everything else, or at least logging a warning when an unrecognized extension is present.
Suggested implementation:
+/// Take a file, return a wrapped [`Read`], and wrap that with the required compression decoder.
+pub fn decompress_read(path: impl AsRef<Path>) -> anyhow::Result<Box<dyn Read + Send>> {
+ let path = path.as_ref();
+ let source = std::fs::File::open(path)?;
+ let source = std::io::BufReader::new(source);
+
+ Ok(match path.extension().and_then(|ext| ext.to_str()) {
+ Some("xz") => Box::new(lzma_rust2::XzReader::new(source, false)),
+ Some("gz") => Box::new(flate2::read::GzDecoder::new(source)),
+ // No extension: treat as uncompressed input (e.g. `.sql`, `.tar`).
+ None => Box::new(source),
+ // Unknown/unsupported extension (e.g. `.bz2`): warn and treat as uncompressed.
+ Some(ext) => {
+ warn!(
+ "Unknown compression extension `{}` for path {:?}; treating as uncompressed input",
+ ext, path
+ );
+ Box::new(source)
+ }
+ })
+} Ok(match path.extension().and_then(|ext| ext.to_str()) {
Some("xz") => Box::pin(async_compression::tokio::bufread::LzmaDecoder::new(source)),
Some("gz") => Box::pin(async_compression::tokio::bufread::GzipDecoder::new(source)),
// No extension: treat as uncompressed input (e.g. `.sql`, `.tar`).
None => Box::pin(source),
// Unknown/unsupported extension (e.g. `.bz2`): warn and treat as uncompressed.
Some(ext) => {
warn!(
"Unknown compression extension `{}` for path {:?}; treating as uncompressed input",
ext, path
);
Box::pin(source)
}
})
}- Ensure a logging macro
warn!is in scope for this module. For example, if the project usestracing, add:use tracing::warn;
If it uses thelogcrate instead, add:use log::warn;
- If there is a central logging style (structured fields, custom spans, etc.), adjust the
warn!calls to match that convention.
| } | ||
| log::info!("Missing - base: {target:?}"); | ||
| Ok(None) | ||
| } |
There was a problem hiding this comment.
suggestion (performance): Using info! for missing file paths may be noisy in normal operation
Since find_file will be called often and missing compressed variants is expected, logging an info! on every miss could overwhelm production logs. Consider downgrading this to debug! or removing it unless you truly need it at info level.
| } | |
| log::info!("Missing - base: {target:?}"); | |
| Ok(None) | |
| } | |
| } | |
| + log::debug!("Missing - base: {target:?}"); | |
| Ok(None) | |
| } |
| impl Database { | ||
| pub async fn try_into_connection(self) -> Result<DatabaseConnection, DbErr> { | ||
| Ok(match self { | ||
| Self::Config { url, schema } => { | ||
| let schema = schema.clone().unwrap_or_else(|| "public".to_owned()); | ||
|
|
||
| let connect_options = ConnectOptions::new(url) | ||
| .set_schema_search_path(schema) | ||
| .to_owned(); | ||
|
|
There was a problem hiding this comment.
suggestion (performance): Unnecessary cloning of schema in Config arm and potential simplification of conversion
Inside Config { url, schema }, schema is already owned, so you can drop the clone and use let schema = schema.unwrap_or_else(|| "public".to_owned());. More broadly, self.database.clone().try_into_connection() only clones because try_into_connection takes self by value; if ownership transfer isn’t required, making it &self would remove the need to clone the enum (which may matter if DatabaseConnection is expensive to clone).
Suggested implementation:
impl Database {
pub async fn try_into_connection(&self) -> Result<DatabaseConnection, DbErr> {
Ok(match self {
Self::Config { url, schema } => {
let schema = schema
.as_ref()
.cloned()
.unwrap_or_else(|| "public".to_owned());
let connect_options = ConnectOptions::new(url.clone())
.set_schema_search_path(schema)
.to_owned();
sea_orm::Database::connect(connect_options).await?
}
Self::Provided(database) => database.clone(),
})
}
}Anywhere try_into_connection is called, remove .clone() on the Database value and call it by reference instead. For example, change self.database.clone().try_into_connection().await? to self.database.try_into_connection().await?. Also ensure that any other direct uses of schema and url within Config arm remain valid now that the method takes &self (hence the url.clone() above to satisfy ownership for ConnectOptions::new).
befb880 to
b33da8f
Compare
b33da8f to
1293af9
Compare
Tests with larger dataset run out of disk space.
eb3ed45 to
fc85e6b
Compare
Summary by Sourcery
Add snapshot-based migration test infrastructure, new DS4-based data dump tests for migration m0002010, and related performance test; refactor dump download and decompression utilities; and update CI and configuration to support long-running tests and improved coverage.
New Features:
Bug Fixes:
Enhancements:
Build:
long_runningfeatures in relevant crates and wire them into tests guarded bycfg_attr, and ensure coverage runs with--all-features.CI:
checkjob runningcargo checkandcargo clippybefore the main CI job, and bump checkout actions to v5.Documentation:
long_runningfeature flag.Tests:
long_runningfeature flag so they can be opted into without impacting regular CI runs.Chores: